package cn.zhang.violet.etl


import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Bz2Parquet {

    def main(args: Array[String]): Unit = {

        // 校验参数
        if (args.length != 2) {
            println(
                """
                  |Usage:
                  | cn.sheep.violet.etl.Bz2Parquet
                  | args:
                  |     dataInputPath:  原始日志输入路径
                  |     dataOutputPath: parquet存储路径
                """.stripMargin)
            sys.exit(101) // 101: 参数不合法 $?
        }

        // 模式匹配 case .... esac
        val Array(dataInputPath, dataOutputPath) = args

        val sparkConf = new SparkConf()
        sparkConf.setAppName("日志转parquet")
        sparkConf.setMaster("local[*]")
        // 设置spark程序采用的序列化方式
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(sparkConf)
        // 读取数据
        val rawData: RDD[String] = sc.textFile(dataInputPath)
        // 导入隐式转换
        import cn.zhang.violet.bean.NewString._
        // RDD[String] => RDD[Row]
        val rowData: RDD[Row] = rawData
            .map(_.split(",", -1)).filter(_.length >= 85) //-1指的是保留空字符串
            .map(fields => {
                Row(
                    fields(0), fields(1).toIntPlus, fields(2).toIntPlus, fields(3).toIntPlus, fields(4).toIntPlus,
                    fields(5), fields(6), fields(7).toIntPlus, fields(8).toIntPlus, fields(9).toDoublePlus,
                    fields(10).toDoublePlus, fields(11), fields(12), fields(13), fields(14), fields(15), fields(16),
                    fields(17).toIntPlus, fields(18), fields(19), fields(20).toIntPlus, fields(21).toIntPlus, fields(22),
                    fields(23),fields(24), fields(25), fields(26).toIntPlus, fields(27), fields(28).toIntPlus, fields(29),
                    fields(30).toIntPlus, fields(31).toIntPlus, fields(32).toIntPlus, fields(33), fields(34).toIntPlus,
                    fields(35).toIntPlus, fields(36).toIntPlus, fields(37), fields(38).toIntPlus, fields(39).toIntPlus,
                    fields(40).toDoublePlus, fields(41).toDoublePlus, fields(42).toIntPlus, fields(43),
                    fields(44).toDoublePlus, fields(45).toDoublePlus, fields(46), fields(47), fields(48), fields(49),
                    fields(50), fields(51), fields(52), fields(53), fields(54), fields(55), fields(56),
                    fields(57).toIntPlus, fields(58).toDoublePlus, fields(59).toIntPlus, fields(60).toIntPlus, fields(61),
                    fields(62), fields(63), fields(64), fields(65), fields(66), fields(67), fields(68), fields(69), fields(70),
                    fields(71), fields(72), fields(73).toIntPlus, fields(74).toDoublePlus, fields(75).toDoublePlus,
                    fields(76).toDoublePlus, fields(77).toDoublePlus, fields(78).toDoublePlus, fields(79), fields(80),
                    fields(81), fields(82), fields(83), fields(84).toIntPlus
                )
            })

        // 构建schema信息, 定义的是每个字段的类型(表的结构)
        val schema = StructType(
            Seq(
                StructField("sessionid", StringType),
                StructField("advertisersid", IntegerType),
                StructField("adorderid", IntegerType),
                StructField("adcreativeid", IntegerType),
                StructField("adplatformproviderid", IntegerType),
                StructField("sdkversion", StringType),
                StructField("adplatformkey", StringType),
                StructField("putinmodeltype", IntegerType),
                StructField("requestmode", IntegerType),
                StructField("adprice", DoubleType),
                StructField("adppprice", DoubleType),
                StructField("requestdate", StringType),
                StructField("ip", StringType),
                StructField("appid", StringType),
                StructField("appname", StringType),
                StructField("uuid", StringType),
                StructField("device", StringType),
                StructField("client", IntegerType),
                StructField("osversion", StringType),
                StructField("density", StringType),
                StructField("pw", IntegerType),
                StructField("ph", IntegerType),
                StructField("long", StringType),
                StructField("lat", StringType),
                StructField("provincename", StringType),
                StructField("cityname", StringType),
                StructField("ispid", IntegerType),
                StructField("ispname", StringType),
                StructField("networkmannerid", IntegerType),
                StructField("networkmannername", StringType),
                StructField("iseffective", IntegerType),
                StructField("isbilling", IntegerType),
                StructField("adspacetype", IntegerType),
                StructField("adspacetypename", StringType),
                StructField("devicetype", IntegerType),
                StructField("processnode", IntegerType),
                StructField("apptype", IntegerType),
                StructField("district", StringType),
                StructField("paymode", IntegerType),
                StructField("isbid", IntegerType),
                StructField("bidprice", DoubleType),
                StructField("winprice", DoubleType),
                StructField("iswin", IntegerType),
                StructField("cur", StringType),
                StructField("rate", DoubleType),
                StructField("cnywinprice", DoubleType),
                StructField("imei", StringType),
                StructField("mac", StringType),
                StructField("idfa", StringType),
                StructField("openudid", StringType),
                StructField("androidid", StringType),
                StructField("rtbprovince", StringType),
                StructField("rtbcity", StringType),
                StructField("rtbdistrict", StringType),
                StructField("rtbstreet", StringType),
                StructField("storeurl", StringType),
                StructField("realip", StringType),
                StructField("isqualityapp", IntegerType),
                StructField("bidfloor", DoubleType),
                StructField("aw", IntegerType),
                StructField("ah", IntegerType),
                StructField("imeimd5", StringType),
                StructField("macmd5", StringType),
                StructField("idfamd5", StringType),
                StructField("openudidmd5", StringType),
                StructField("androididmd5", StringType),
                StructField("imeisha1", StringType),
                StructField("macsha1", StringType),
                StructField("idfasha1", StringType),
                StructField("openudidsha1", StringType),
                StructField("androididsha1", StringType),
                StructField("uuidunknow", StringType),
                StructField("userid", StringType),
                StructField("iptype", IntegerType),
                StructField("initbidprice", DoubleType),
                StructField("adpayment", DoubleType),
                StructField("agentrate", DoubleType),
                StructField("lrate", DoubleType),
                StructField("adxrate", DoubleType),
                StructField("title", StringType),
                StructField("keywords", StringType),
                StructField("tagid", StringType),
                StructField("callbackdate", StringType),
                StructField("channelid", StringType),
                StructField("mediatype", IntegerType)
            )
        )
        // 处理数据
        val sQLContext = new SQLContext(sc)
        // 设置parquet文件的压缩格式
        sQLContext.setConf("spark.sql.parquet.compression.codec", "snappy")
        val dataFrame = sQLContext.createDataFrame(rowData, schema)
        // 判断输出路径是否已存在，如果存在删除
        /**
          * fs:
          *     是本地文件系统还是分布式文件系统，取决于客户端中的fs.defaultFS的参数
          *         file:///            本地文件系统
          *         hdfs://host:9000/   分布式文件系统
          */
        val fs = FileSystem.get(sc.hadoopConfiguration)
        val path = new Path(dataOutputPath)
        if (fs.exists(path)) {
            fs.delete(path, true)
        }
        // 存数据 parquet
       // dataFrame.write.partitionBy("provincename", "cityname").parquet(dataOutputPath)
        //存数据，不分区
        dataFrame.write.parquet(dataOutputPath)
        // 释放
        sc.stop()
    }

}
